package org.niit.service

import java.text.SimpleDateFormat

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.niit.common.TService
import org.niit.dao.RealTimeDao
import org.niit.bean.AdClickData
import org.niit.util.MyKafkaUtil

class RealTimeService extends TService{

  override def dataAnalysis(data: DStream[AdClickData]): Any = {


    //实时数据统计的方法
    reduceRealTimeCount(data)

  }
  private def reduceRealTimeCount(adClickData: DStream[AdClickData]): Unit ={
      /*
      AdClickData=>时间 区域 城市 用户 广告
       用户点击数据==>(时间，区域，城市，广告，用户)
       ==>(时间，区域，城市，广告)
          时间戳==>转换
       */
    val reduceDS =  adClickData.map(data=>{
      val sdf = new SimpleDateFormat("yyyy-MM-dd")
      val day = sdf.format(new java.util.Date(data.ts.toLong))
      val area = data.area
      val city = data.city
      val ad = data.ad
      // ( (时间，区域，城市，广告),次数)
      ( (day,area,city,ad),1 )

    }).reduceByKey(_ + _)

     reduceDS.foreachRDD(rdd=>{
       rdd.foreach{
         case ((day,area,city,ad),count)=>{
           println(s"${day},${area},${city},${ad},${count}")
           val realTimeDao = new RealTimeDao
           realTimeDao.insertRealTimeAd(day,area,city,ad,count.toString)
         }
       }
     })

  }
}

